{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.\n",
        "\n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/automated-machine-learning/continuous-retraining/auto-ml-continuous-retraining.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Automated Machine Learning \n",
        "**Continuous retraining using Pipelines and Time-Series TabularDataset**\n",
        "## Contents\n",
        "1. [Introduction](#Introduction)\n",
        "2. [Setup](#Setup)\n",
        "3. [Compute](#Compute)\n",
        "4. [Run Configuration](#Run-Configuration)\n",
        "5. [Data Ingestion Pipeline](#Data-Ingestion-Pipeline)\n",
        "6. [Training Pipeline](#Training-Pipeline)\n",
        "7. [Publish Retraining Pipeline and Schedule](#Publish-Retraining-Pipeline-and-Schedule)\n",
        "8. [Test Retraining](#Test-Retraining)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Introduction\n",
        "In this example we use AutoML and Pipelines to enable contious retraining of a model based on updates to the training dataset. We will create two pipelines, the first one to demonstrate a training dataset that gets updated over time. We leverage time-series capabilities of `TabularDataset` to achieve this. The second pipeline utilizes pipeline `Schedule` to trigger continuous retraining. \n",
        "Make sure you have executed the [configuration notebook](../../../configuration.ipynb) before running this notebook.\n",
        "In this notebook you will learn how to:\n",
        "* Create an Experiment in an existing Workspace.\n",
        "* Configure AutoML using AutoMLConfig.\n",
        "* Create data ingestion pipeline to update a time-series based TabularDataset\n",
        "* Create training pipeline to prepare data, run AutoML, register the model and setup pipeline triggers.\n",
        "\n",
        "## Setup\n",
        "As part of the setup you have already created an Azure ML `Workspace` object. For AutoML you will need to create an `Experiment` object, which is a named object in a `Workspace` used to run experiments."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import logging\n",
        "\n",
        "from matplotlib import pyplot as plt\n",
        "import numpy as np\n",
        "import pandas as pd\n",
        "from sklearn import datasets\n",
        "\n",
        "import azureml.core\n",
        "from azureml.core.experiment import Experiment\n",
        "from azureml.core.workspace import Workspace\n",
        "from azureml.train.automl import AutoMLConfig"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "This sample notebook may use features that are not available in previous versions of the Azure ML SDK."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Accessing the Azure ML workspace requires authentication with Azure.\n",
        "\n",
        "The default authentication is interactive authentication using the default tenant. Executing the ws = Workspace.from_config() line in the cell below will prompt for authentication the first time that it is run.\n",
        "\n",
        "If you have multiple Azure tenants, you can specify the tenant by replacing the ws = Workspace.from_config() line in the cell below with the following:\n",
        "```\n",
        "from azureml.core.authentication import InteractiveLoginAuthentication\n",
        "auth = InteractiveLoginAuthentication(tenant_id = 'mytenantid')\n",
        "ws = Workspace.from_config(auth = auth)\n",
        "```\n",
        "If you need to run in an environment where interactive login is not possible, you can use Service Principal authentication by replacing the ws = Workspace.from_config() line in the cell below with the following:\n",
        "```\n",
        "from azureml.core.authentication import ServicePrincipalAuthentication\n",
        "auth = auth = ServicePrincipalAuthentication('mytenantid', 'myappid', 'mypassword')\n",
        "ws = Workspace.from_config(auth = auth)\n",
        "```\n",
        "For more details, see aka.ms/aml-notebook-auth"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "ws = Workspace.from_config()\n",
        "dstor = ws.get_default_datastore()\n",
        "\n",
        "# Choose a name for the run history container in the workspace.\n",
        "experiment_name = \"retrain-noaaweather\"\n",
        "experiment = Experiment(ws, experiment_name)\n",
        "\n",
        "output = {}\n",
        "output[\"Subscription ID\"] = ws.subscription_id\n",
        "output[\"Workspace\"] = ws.name\n",
        "output[\"Resource Group\"] = ws.resource_group\n",
        "output[\"Location\"] = ws.location\n",
        "output[\"Run History Name\"] = experiment_name\n",
        "output[\"SDK Version\"] = azureml.core.VERSION\n",
        "pd.set_option(\"display.max_colwidth\", None)\n",
        "outputDf = pd.DataFrame(data=output, index=[\"\"])\n",
        "outputDf.T"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Compute \n",
        "\n",
        "#### Create or Attach existing AmlCompute\n",
        "\n",
        "You will need to create a compute target for your AutoML run. In this tutorial, you create AmlCompute as your training compute resource.\n",
        "\n",
        "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist.\n",
        "\n",
        "#### Creation of AmlCompute takes approximately 5 minutes. \n",
        "If the AmlCompute with that name is already in your workspace this code will skip the creation process.\n",
        "As with other Azure services, there are limits on certain resources (e.g. AmlCompute) associated with the Azure Machine Learning service. Please read [this article](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-manage-quotas) on the default limits and how to request more quota."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute import ComputeTarget, AmlCompute\n",
        "from azureml.core.compute_target import ComputeTargetException\n",
        "\n",
        "# Choose a name for your CPU cluster\n",
        "amlcompute_cluster_name = \"cont-cluster\"\n",
        "\n",
        "# Verify that cluster does not exist already\n",
        "try:\n",
        "    compute_target = ComputeTarget(workspace=ws, name=amlcompute_cluster_name)\n",
        "    print(\"Found existing cluster, use it.\")\n",
        "except ComputeTargetException:\n",
        "    compute_config = AmlCompute.provisioning_configuration(\n",
        "        vm_size=\"STANDARD_DS12_V2\", max_nodes=4\n",
        "    )\n",
        "    compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)\n",
        "compute_target.wait_for_completion(show_output=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Run Configuration"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.runconfig import CondaDependencies, RunConfiguration\n",
        "\n",
        "# create a new RunConfig object\n",
        "conda_run_config = RunConfiguration(framework=\"python\")\n",
        "\n",
        "# Set compute target to AmlCompute\n",
        "conda_run_config.target = compute_target\n",
        "\n",
        "conda_run_config.environment.docker.enabled = True\n",
        "\n",
        "cd = CondaDependencies.create(\n",
        "    pip_packages=[\n",
        "        \"azureml-sdk[automl]\",\n",
        "        \"applicationinsights\",\n",
        "        \"azureml-opendatasets\",\n",
        "        \"azureml-defaults\",\n",
        "    ],\n",
        "    conda_packages=[\"numpy==1.19.5\"],\n",
        "    pin_sdk_version=False,\n",
        ")\n",
        "conda_run_config.environment.python.conda_dependencies = cd\n",
        "\n",
        "print(\"run config is ready\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Data Ingestion Pipeline \n",
        "For this demo, we will use NOAA weather data from [Azure Open Datasets](https://azure.microsoft.com/services/open-datasets/). You can replace this with your own dataset, or you can skip this pipeline if you already have a time-series based `TabularDataset`.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# The name and target column of the Dataset to create\n",
        "dataset = \"NOAA-Weather-DS4\"\n",
        "target_column_name = \"temperature\""
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "\n",
        "### Upload Data Step\n",
        "The data ingestion pipeline has a single step with a script to query the latest weather data and upload it to the blob store. During the first run, the script will create and register a time-series based `TabularDataset` with the past one week of weather data. For each subsequent run, the script will create a partition in the blob store by querying NOAA for new weather data since the last modified time of the dataset (`dataset.data_changed_time`) and creating a data.csv file."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import Pipeline, PipelineParameter\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "\n",
        "ds_name = PipelineParameter(name=\"ds_name\", default_value=dataset)\n",
        "upload_data_step = PythonScriptStep(\n",
        "    script_name=\"upload_weather_data.py\",\n",
        "    allow_reuse=False,\n",
        "    name=\"upload_weather_data\",\n",
        "    arguments=[\"--ds_name\", ds_name],\n",
        "    compute_target=compute_target,\n",
        "    runconfig=conda_run_config,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Submit Pipeline Run"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "data_pipeline = Pipeline(\n",
        "    description=\"pipeline_with_uploaddata\", workspace=ws, steps=[upload_data_step]\n",
        ")\n",
        "data_pipeline_run = experiment.submit(\n",
        "    data_pipeline, pipeline_parameters={\"ds_name\": dataset}\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "data_pipeline_run.wait_for_completion(show_output=False)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Training Pipeline\n",
        "### Prepare Training Data Step\n",
        "\n",
        "Script to check if new data is available since the model was last trained. If no new data is available, we cancel the remaining pipeline steps. We need to set allow_reuse flag to False to allow the pipeline to run even when inputs don't change. We also need the name of the model to check the time the model was last trained."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PipelineData\n",
        "\n",
        "# The model name with which to register the trained model in the workspace.\n",
        "model_name = PipelineParameter(\"model_name\", default_value=\"noaaweatherds\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "data_prep_step = PythonScriptStep(\n",
        "    script_name=\"check_data.py\",\n",
        "    allow_reuse=False,\n",
        "    name=\"check_data\",\n",
        "    arguments=[\"--ds_name\", ds_name, \"--model_name\", model_name],\n",
        "    compute_target=compute_target,\n",
        "    runconfig=conda_run_config,\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core import Dataset\n",
        "\n",
        "train_ds = Dataset.get_by_name(ws, dataset)\n",
        "train_ds = train_ds.drop_columns([\"partition_date\"])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### AutoMLStep\n",
        "Create an AutoMLConfig and a training step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.train.automl import AutoMLConfig\n",
        "from azureml.pipeline.steps import AutoMLStep\n",
        "\n",
        "automl_settings = {\n",
        "    \"iteration_timeout_minutes\": 10,\n",
        "    \"experiment_timeout_hours\": 0.25,\n",
        "    \"n_cross_validations\": 3,\n",
        "    \"primary_metric\": \"r2_score\",\n",
        "    \"max_concurrent_iterations\": 3,\n",
        "    \"max_cores_per_iteration\": -1,\n",
        "    \"verbosity\": logging.INFO,\n",
        "    \"enable_early_stopping\": True,\n",
        "}\n",
        "\n",
        "automl_config = AutoMLConfig(\n",
        "    task=\"regression\",\n",
        "    debug_log=\"automl_errors.log\",\n",
        "    path=\".\",\n",
        "    compute_target=compute_target,\n",
        "    training_data=train_ds,\n",
        "    label_column_name=target_column_name,\n",
        "    **automl_settings,\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PipelineData, TrainingOutput\n",
        "\n",
        "metrics_output_name = \"metrics_output\"\n",
        "best_model_output_name = \"best_model_output\"\n",
        "\n",
        "metrics_data = PipelineData(\n",
        "    name=\"metrics_data\",\n",
        "    datastore=dstor,\n",
        "    pipeline_output_name=metrics_output_name,\n",
        "    training_output=TrainingOutput(type=\"Metrics\"),\n",
        ")\n",
        "model_data = PipelineData(\n",
        "    name=\"model_data\",\n",
        "    datastore=dstor,\n",
        "    pipeline_output_name=best_model_output_name,\n",
        "    training_output=TrainingOutput(type=\"Model\"),\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "automl_step = AutoMLStep(\n",
        "    name=\"automl_module\",\n",
        "    automl_config=automl_config,\n",
        "    outputs=[metrics_data, model_data],\n",
        "    allow_reuse=False,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Register Model Step\n",
        "Script to register the model to the workspace. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "register_model_step = PythonScriptStep(\n",
        "    script_name=\"register_model.py\",\n",
        "    name=\"register_model\",\n",
        "    allow_reuse=False,\n",
        "    arguments=[\n",
        "        \"--model_name\",\n",
        "        model_name,\n",
        "        \"--model_path\",\n",
        "        model_data,\n",
        "        \"--ds_name\",\n",
        "        ds_name,\n",
        "    ],\n",
        "    inputs=[model_data],\n",
        "    compute_target=compute_target,\n",
        "    runconfig=conda_run_config,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Submit Pipeline Run"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "training_pipeline = Pipeline(\n",
        "    description=\"training_pipeline\",\n",
        "    workspace=ws,\n",
        "    steps=[data_prep_step, automl_step, register_model_step],\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "training_pipeline_run = experiment.submit(\n",
        "    training_pipeline,\n",
        "    pipeline_parameters={\"ds_name\": dataset, \"model_name\": \"noaaweatherds\"},\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "training_pipeline_run.wait_for_completion(show_output=False)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Publish Retraining Pipeline and Schedule\n",
        "Once we are happy with the pipeline, we can publish the training pipeline to the workspace and create a schedule to trigger on blob change. The schedule polls the blob store where the data is being uploaded and runs the retraining pipeline if there is a data change. A new version of the model will be registered to the workspace once the run is complete."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_name = \"Retraining-Pipeline-NOAAWeather\"\n",
        "\n",
        "published_pipeline = training_pipeline.publish(\n",
        "    name=pipeline_name, description=\"Pipeline that retrains AutoML model\"\n",
        ")\n",
        "\n",
        "published_pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import Schedule\n",
        "\n",
        "schedule = Schedule.create(\n",
        "    workspace=ws,\n",
        "    name=\"RetrainingSchedule\",\n",
        "    pipeline_parameters={\"ds_name\": dataset, \"model_name\": \"noaaweatherds\"},\n",
        "    pipeline_id=published_pipeline.id,\n",
        "    experiment_name=experiment_name,\n",
        "    datastore=dstor,\n",
        "    wait_for_provisioning=True,\n",
        "    polling_interval=1440,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Test Retraining\n",
        "Here we setup the data ingestion pipeline to run on a schedule, to verify that the retraining pipeline runs as expected. \n",
        "\n",
        "Note: \n",
        "* Azure NOAA Weather data is updated daily and retraining will not trigger if there is no new data available. \n",
        "* Depending on the polling interval set in the schedule, the retraining may take some time trigger after data ingestion pipeline completes."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_name = \"DataIngestion-Pipeline-NOAAWeather\"\n",
        "\n",
        "published_pipeline = training_pipeline.publish(\n",
        "    name=pipeline_name, description=\"Pipeline that updates NOAAWeather Dataset\"\n",
        ")\n",
        "\n",
        "published_pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import Schedule\n",
        "\n",
        "schedule = Schedule.create(\n",
        "    workspace=ws,\n",
        "    name=\"RetrainingSchedule-DataIngestion\",\n",
        "    pipeline_parameters={\"ds_name\": dataset},\n",
        "    pipeline_id=published_pipeline.id,\n",
        "    experiment_name=experiment_name,\n",
        "    datastore=dstor,\n",
        "    wait_for_provisioning=True,\n",
        "    polling_interval=1440,\n",
        ")"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "vivijay"
      }
    ],
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.6"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 2
}